package com.dahuan.tables;

import com.dahuan.bean.SensorReading;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.table.api.GroupWindow;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.Tumble;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.types.Row;

public class Table_GroupBy_Window {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        //指定事件时间
        env.setStreamTimeCharacteristic( TimeCharacteristic.EventTime);

        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

        // 2. 读入文件数据，得到DataStream
        DataStream<String> inputStream = env.readTextFile("E:\\Project\\FlinkTutorials\\Flink-Scala\\src\\main\\resources\\sensor.txt");

        // 3. 转换成POJO
        DataStream<SensorReading> dataStream = inputStream.map( line -> {
            String[] fields = line.split(",");
            return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));
        })
                .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<SensorReading>( Time.seconds(2)) {
                    @Override
                    public long extractTimestamp(SensorReading element) {
                        return element.getTimestamp() * 1000L;
                    }
                });

        // 4. 将流转换成表，定义时间特性
        Table dataTable = tableEnv.fromDataStream(dataStream, "id, timestamp as ts, temperature as temp, rt.rowtime");

        // 5.窗口操作 (Group by)
        // 5.1 table API
        //TODO 步长10秒 (定义了十秒内时间下的滚动窗口)
        Table tableAPI = dataTable.window( Tumble.over( "10.seconds" ).on( "rt" ).as( "tw" ) )
                .groupBy( "id,tw" ) //TODO  以属性 id 和窗口 tw 作为分组的 key
                .select( "id,id.count,temp.avg,tw.end" );

        //5.2 SQL
        //TODO 创建临时表
        tableEnv.createTemporaryView( "sensor", dataTable);
        //TODO 定义一个滚动窗口，第一个参数是时间字段，第二个参数是窗口长度。
        String sql = "select id,count(id) as cnt,avg(temp) as avgTemp,tumble_end(rt,interval '10' second)" +
                     "from sensor group by id , tumble(rt,interval '10' second) ";

        Table sqlQuery = tableEnv.sqlQuery( sql );

        tableEnv.toAppendStream( tableAPI, Row.class ).print("tableAPI");
        tableEnv.toAppendStream( sqlQuery,Row.class ).print("SQL");



        env.execute("Table_GroupBy_Window");
    }
}
